---
layout: post
date: 2013-06-20
title: "Concurrent-Friendly Logging with Golang"
description: "Leveraging Go slices and channels to do logging under high concurrency."
tags: [concurrency, devops, golang, logging]
---

<p>Although we managed to <a href=/scaling-viki/>launch Viki's new platform</a> relatively flawlessly, we completely failed to execute on our post-launch roadmap. Our goal had been to focus on polishing search and recommendation. Month's behind schedule, we've finally started doing some initial work on this. At a high level, the project can be split into three phases. First we need to collect data, next we need to analyze it, and finally we need to feed it back into the system.</p>

<p>For now, we're focusing on collecting some basic data. I think working with real data from the get go will be pretty helpful as we move forward with the other, more complicated, phases. Our approach to collecting the data, admittedly not the hardest problem in the world, is inspired by the rest of our architecture: distributed and asynchronous. Essentially, an event comes in via HTTP, an event object is generated as byte array (protocol buffers), it's buffered into memory, and the buffer is periodically flushed to disk. A background process picks these files up and sends them to central servers, which can take their time denormalizing the data as needed and storing it into whatever engine we use (we're thinking RethinkDB).</p>

<p>The approach has a few benefits. First, there's no single point of failure. If the central servers become inaccessible, we can queue up on the edges (so far our data isn't very heavy, so a couple TB of local storage can last for a long time). Secondly, all of the heavy work, including flushing the buffer to disk, is done outside of the main HTTP thread. This hopefully means we can process tens of thousands of messages per second with virtually no impact on performance.</p>

<p>The code that takes an HTTP request and turns it into a <code>[]byte</code> is pretty specific to our needs. The code to buffer and flush isn't (it also isn't tested nor put into production yet, but it soon will be). To avoid blocking while we're flushing to disk, we use a buffered channel to coordinate multiple workers: </p>

{% highlight go %}
package buffered

import (
  "time"
)

const workerCount = 4
var channel = make(chan []byte, 1024)
var workers = make([]*Worker, workerCount)

func init() {
  for i := 0; i < workerCount; i++ {
    workers[i] = NewWorker(i)
    go workers[i].Work(channel)
  }
}

func Log(event []byte) {
  select {
    case channel <- event:
    case <- time.After(5 * time.Second):
      // throw away the message, so sad
  }
}
{% endhighlight %}

<p>The above code does two simples things. First, it creates and starts 4 workers, each running in their own goroutine (in go, a function named <code>init</code> is automatically execute the first time the package is imported). Second, it exposes a <code>Log</code> function which takes a byte array (the event to log). The buffered channel named <code>channel</code> is the glue between the two pieces of code. Each worker is given a reference to the channel when it's started. This is the same channel which <code>Log</code> writes to. The channel is buffered, so it can hold up to 1024 events in memory before writes to it block (it gets unblocked by having the other side, the worker, which we'll see in a second, read from the channel). If it blocks for more than 5 seconds, our <code>select</code> code will pick the 2nd case and simply discard the event (which is ok for us, but maybe you'd rather fail hard)</p>

<p>Next we have the workers:</p>

{% highlight go %}
package buffered

import (
  "os"
  "log"
  "time"
  "strconv"
  "io/ioutil"
)

const capacity = 32768

type Worker struct {
  fileRoot string
  buffer []byte
  position int
}

func NewWorker(id int) (w *Worker) {
  return &Worker{
    //move the root path to some config or something
    fileRoot = "/var/log/tracker/" + strconv.Itoa(id) + "_",
    buffer: make([]byte, capacity),
  }
}

func (w *Worker) Work(channel chan []byte) {
  for {
    event := <- channel
    length := len(event)
    // we run with nginx's client_max_body_size set to 2K which makes this
    // unlikely to happen, but, just in case...
    if length > capacity {
      log.Println("message received was too large")
      continue
    }
    if (length + w.position) > capacity {
      w.Save()
    }
    copy(w.buffer[w.position:], event)
    w.position += length
  }
}

func (w *Worker) Save() {
  if w.position == 0 { return }
  f, _ := ioutil.TempFile("", "logs_")
  f.Write(w.buffer[0:w.position])
  f.Close()
  os.Rename(f.Name(), w.fileRoot + strconv.FormatInt(time.Now().UnixNano(), 10))
  w.position = 0
}
{% endhighlight %}

<p>Each worker has has its own <code>buffer</code>. The worker spends most of it's time in the <code>Work</code> function, which reads from the buffered channel in an endless loop (blocking if there's no event). When there's enough room in the buffer for the new event, we simply need to copy the event into the buffer. When there isn't, we need to flush the buffer to disk. Our worker has a few interesting properties.</p>

<p>First, we re-use the same <code>buffer</code> over and over again. This avoid having to constantly allocate memory, reducing the amount of GC needed and reducing fragmentation. We're able to do this by keeping track of where in the buffer we currently are and relying on Go's efficient slices to copy and write only the relevant part of the buffer.</p>

<p>Second, because all coordination is achieved via the channel, our worker is thread-safe as-is.We don't need to lock access when we update <code>w.position</code> or anything like that.</p>

<p>Third, we flush to a tmp file and rename to our final destination. Unless you're doing so across devices, rename tends to be atomic. This is important as we plan on having a background process pick up new files and send them to our central server. If we wrote directly to this location, our write/flush would not be atomic and there'd a good chance the background process would pick up a partial file.</p>

<p>Finally, we can always improve performance by increasing the size of the buffer. The bigger the buffer, the less often we have to flush, the better the performance. Unfortunately, a bigger buffer also means if we crash or something abnormal happens, we'll lose a bigger chunk of data.</p>

<p>Between having multiple workers and buffered channels, the hope is that no request is ever blocked waiting for a sync to disk. Another way to solve this would have been to swap out a full buffer for an empty buffer and flushing in the background. This would only have required a short-lived lock.</p>

<p>One thing to keep in mind with this sort of approach is that events are spread out across multiple workers/files. In our case, each event has a timestamp, and it'll be the central server's job to make sure things are accessible in proper order.</p>
